package com.bieber.server;

import com.bieber.common.Constants;
import com.bieber.common.Node;
import com.bieber.registry.Notification;
import com.bieber.registry.Registry;
import com.bieber.registry.ZookeeperRegistry;
import com.bieber.registry.config.RegistryConfig;
import com.bieber.server.config.ServerConfig;
import com.bieber.server.exception.FileTransportException;
import com.bieber.server.pack.FilePackage;
import com.bieber.server.remote.Server;
import io.netty.channel.Channel;
import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by bieber on 2015/8/19.
 * 整个传输的封装类
 */
public class Transporter {
    
    private static volatile  boolean running=false;
    
    private static final ConcurrentHashMap<Channel,Acceptor> ACCEPTOR_MAP = new ConcurrentHashMap<Channel, Acceptor>();

    private static final ConcurrentHashMap<Node,Sender> SENDER_MAP = new ConcurrentHashMap<Node, Sender>();

    private static ServerConfig config;
    
    private static  HelpFormatter hf;
    
    private static Server server;

    private static Registry registry;

    public static Acceptor registerAcceptor(Channel channel, FilePackage filePackage){
        if(!running){
            throw new RuntimeException("Transporter server had stopped.....");
        }
        if(!ACCEPTOR_MAP.containsKey(channel)){
            ACCEPTOR_MAP.putIfAbsent(channel, new Acceptor(config, channel, filePackage.getNodeName()));
        }
        Acceptor writer = ACCEPTOR_MAP.get(channel);
        try {
            writer.startAcceptFile(filePackage.getFileName(), filePackage.getFileSize());
        } catch (FileTransportException e) {
            throw new RuntimeException("failted to star file writer for "+filePackage.getFileName(),e);
        }
        return writer;
    }
    
    public static void registerSender(Node node, Sender reader){
        if(!running){
            return;
        }
        if(!SENDER_MAP.containsKey(node)){
            SENDER_MAP.putIfAbsent(node, reader);
        }
    }

    public static Sender getSender(Node node){
        return SENDER_MAP.get(node);
    }

    public static Sender unRegisterSender(Node node){
       return SENDER_MAP.remove(node);
    }


    public static Acceptor getAcceptor(Channel channel){
        return ACCEPTOR_MAP.get(channel);
    }
    

    private static Options generateOptions(){
        Options options = new Options();
        Option option = new Option("c","configFile",true,"Transporter server config file path");
        options.addOption(option);
        option = new Option("pc","printConfig",false,"Print all Transporter server config");
        options.addOption(option);
        option = new Option("n","name",true,"Transporter server name");
        options.addOption(option);
        option = new Option("p","port",true,"Transporter port");
        options.addOption(option);
        option = new Option("h","help",false,"Get Help");
        options.addOption(option);
        return options;
    }

    private static CommandLine parseCommandLine(CommandLineParser parser,String[] args,Options options){
        hf = new HelpFormatter();
        hf.setWidth(110);
        CommandLine commandLine = null;
        try {
            commandLine = parser.parse(options,args);
            if(commandLine.hasOption("h")){
                hf.printHelp(Constants.APP_NAME,options,true);
            }
        } catch (ParseException e) {
            hf.printHelp(Constants.APP_NAME,options,true);
        }
        return commandLine;
    }



    public static void start(String[] args){
        if(running){
            throw new IllegalStateException("Transporter server had started!");
        }
        running=true;
        config = new ServerConfig();
        RegistryConfig registryConfig = new RegistryConfig();
        Options options = generateOptions();
        CommandLine commandLine = parseCommandLine(new PosixParser(),args,options);
        if(commandLine==null){
            System.exit(0);
            return ;
        }
        Constants.COMMON_LOGGER.info("Starting loading config.....");
        if(commandLine.hasOption("c")) {
            FileInputStream inputStream=null;
            try {
                Constants.COMMON_LOGGER.info("loading properties file [{}]",commandLine.getOptionValue("c"));
                inputStream=new FileInputStream(commandLine.getOptionValue("c"));
                Properties properties = new Properties();
                properties.load(inputStream);
                config.loadFromProperties(properties);
                registryConfig.loadFromProperties(properties);
            } catch (Exception e) {
                Constants.COMMON_LOGGER.error("loading config from properties occur an exception",e);
            }finally {
                IOUtils.closeQuietly(inputStream);
            }
        }
        if(commandLine.hasOption("n")){
            config.setNodeName(commandLine.getOptionValue("n"));
        }
        if(commandLine.hasOption("p")){
            config.setPort(Integer.parseInt(commandLine.getOptionValue("p")));
        }
        Constants.COMMON_LOGGER.info("Loaded config.....");
        if(commandLine.hasOption("pc")){
            Constants.COMMON_LOGGER.info(config.toString());
        }
        validateConfig(options);
        Constants.COMMON_LOGGER.info("send file directory [{}],access file directory [{}],server port [{}],node name [{}]",config.getSendFileDir(),config.getAcceptFileDir(),config.getPort(),config.getNodeName());
        registry  = new ZookeeperRegistry(registryConfig);
        registry.staticRegister(Constants.CONFIGURATOR_PATH);
        server = new Server(config,registry);
        server.initialize();
        server.start();
        registry.staticRegister(Constants.CONFIGURATOR_PATH+"/"+Constants.SHUTDOWN_COMMAND);
        registry.staticRegister(Constants.STATISTICS);
        registry.dynamicRegister(Constants.STATISTICS + "/" + config.getNodeName());
        registry.subscribe(Constants.CONFIGURATOR_PATH + "/" + Constants.SHUTDOWN_COMMAND, new Notification() {
            @Override
            public void notify(List<String> children) {
                for(String child:children){
                    if(config.getNodeName().equals(child)){
                        stop();
                        break;
                    }
                }
            }
        });
    }


    public static void reportCurrentAcceptStatus(String fileName, long fileSize, long writeSize) {
        try {
            if(fileSize==0){
                fileSize=1;
                writeSize=1;
            }
            StringBuffer stringBuffer = new StringBuffer(fileName);
            stringBuffer.append("->").append(fileSize).append("->").append(writeSize).append("->").append(writeSize*100/fileSize);
            registry.updateData(Constants.STATISTICS+"/"+config.getNodeName(),stringBuffer.toString().getBytes(config.getCharSet()));
        } catch (IOException e) {
            Constants.COMMON_LOGGER.warn("failed to report current accept status",e);
        }
    }

    private static void validateConfig(Options options){
        if(StringUtils.isEmpty(config.getNodeName())){
            hf.printHelp(Constants.APP_NAME,options,true);
            throw new IllegalArgumentException("node name must be set");
        }
    }

    public static boolean isRunning(){
        return running;
    }

    public static void stop(){
        Constants.COMMON_LOGGER.info("stopping transporter server.....");
        running=false;
        Thread thread = new Thread(){
            @Override
            public void run() {
                while(true){
                    boolean canShutdown = true;
                    try {
                        Constants.COMMON_LOGGER.info("starting check writers status.....");
                        for(Map.Entry<Channel,Acceptor> entry: ACCEPTOR_MAP.entrySet()){
                            if(entry.getValue().isWorking()){
                                canShutdown=false;
                                break;
                            }
                        }
                        Constants.COMMON_LOGGER.info("starting check reader status......");
                        for(Map.Entry<Node,Sender> entry: SENDER_MAP.entrySet()){
                            if(entry.getValue().isWorking()){
                                canShutdown=false;
                                break;
                            }
                        }
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Constants.COMMON_LOGGER.warn("check reader and writer status occur an exception ",e);
                        canShutdown=true;
                    }finally {
                        if(canShutdown){
                            server.stop();
                            registry.unregister(Constants.CONFIGURATOR_PATH+"/"+Constants.SHUTDOWN_COMMAND+"/"+config.getNodeName());
                            registry.unregister(Constants.CONFIGURATOR_PATH + "/" + config.getNodeName());
                            registry.stop();
                            Constants.COMMON_LOGGER.info("stopped transporter server......");
                            System.exit(0);
                        }
                    }
                }
            }
        };
        thread.start();
    }
    
    
}
